﻿// *********************************************************************
// Copyright (c) Microsoft Corporation.  All rights reserved.
// Licensed under the MIT License
// *********************************************************************
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using Microsoft.StreamProcessing;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace SimpleTesting.PartitionedIngressAndEgress.ConceptualDemos
{
    [TestClass]
    public sealed class ConceptualDemos : TestWithConfigSettingsAndMemoryLeakDetection, IDisposable
    {
        // Demonstrates how a DisorderPolicy.Drop can both drop and, with reorderLatency, reorder out-of-order events
        [TestMethod, TestCategory("Gated")]
        public void ReorderLatencyDrop()
        {
            var disorderPolicy = DisorderPolicy.Drop(reorderLatency: 10);

            SetupQuery(disorderPolicy);

            // These will be buffered and reordered due to reorderLatency of 10
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 30, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 25, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 20, 0));

            // These will be dropped since they are before the partition's high watermark (30) - reorder latency (10) = 20
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 8, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 9, 0));

            // Each partition is independent, so a new partition is unaffected by another's high watermark/reorderLatency
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 1, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 10, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 5, 0));

            var expected = new PartitionedStreamEvent<int, int>[]
            {
                // Events are reordered within the reorderLatency
                PartitionedStreamEvent.CreatePoint(0, 20, 0),
                PartitionedStreamEvent.CreatePoint(0, 25, 0),
                PartitionedStreamEvent.CreatePoint(0, 30, 0),

                // Each partition acts independently
                PartitionedStreamEvent.CreatePoint(1, 1, 0),
                PartitionedStreamEvent.CreatePoint(1, 5, 0),
                PartitionedStreamEvent.CreatePoint(1, 10, 0),

                // Generated by the default OnCompletedPolicy EndOfStream
                PartitionedStreamEvent.CreateLowWatermark<int, int>(StreamEvent.InfinitySyncTime),
            };

            FinishQuery(expected);
        }

        // Demonstrates how a DisorderPolicy.Adjust can both adjust and, with reorderLatency, reorder out-of-order events
        [TestMethod, TestCategory("Gated")]
        public void ReorderLatencyAdjust()
        {
            var disorderPolicy = DisorderPolicy.Adjust(reorderLatency: 10);

            SetupQuery(disorderPolicy);

            // These will be buffered and reordered due to reorderLatency of 10
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 30, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 25, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 20, 0));

            // These will be adjusted to the minimum, since they are before the partition's high watermark (30) - reorder latency (10) = 20
            this.input.OnNext(PartitionedStreamEvent.CreateInterval(0, 8, 30, 0)); // Will be adjusted to 20
            this.input.OnNext(PartitionedStreamEvent.CreateInterval(0, 9, 30, 0)); // Will be adjusted to 20

            // This will be dropped, since Trill will not adjust intervals whose end times are before the minimum
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 8, 0)); // Will be dropped
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 9, 0)); // Will be dropped

            // Each partition is independent, so a new partition is unaffected by another's high watermark/reorderLatency
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 1, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 10, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 5, 0));

            var expected = new PartitionedStreamEvent<int, int>[]
            {
                // Events are reordered within the reorderLatency, and out of order events are adjusted to minimum
                PartitionedStreamEvent.CreatePoint(0, 20, 0),
                PartitionedStreamEvent.CreateInterval(0, 20, 30, 0),
                PartitionedStreamEvent.CreateInterval(0, 20, 30, 0),
                PartitionedStreamEvent.CreatePoint(0, 25, 0),
                PartitionedStreamEvent.CreatePoint(0, 30, 0),

                // Each partition acts independently
                PartitionedStreamEvent.CreatePoint(1, 1, 0),
                PartitionedStreamEvent.CreatePoint(1, 5, 0),
                PartitionedStreamEvent.CreatePoint(1, 10, 0),

                // Generated by the default OnCompletedPolicy EndOfStream
                PartitionedStreamEvent.CreateLowWatermark<int, int>(StreamEvent.InfinitySyncTime),
            };

            FinishQuery(expected);
        }

        // Demonstrates a low watermark's implications on partitions' lower bounds
        [TestMethod, TestCategory("Gated")]
        public void LowWatermark()
        {
            var disorderPolicy = DisorderPolicy.Drop(reorderLatency: 10);

            SetupQuery(disorderPolicy, PartitionedFlushPolicy.None);

            // This will be buffered due to reorderLatency
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 1, 0));

            // This will set the entire stream's lower bound to 100
            this.input.OnNext(PartitionedStreamEvent.CreateLowWatermark<int, int>(100));

            // If any partition, new or existing, specifies a timestamp before 100, it is considered out of order
            // and since we specify DisorderPolicy.Drop, they will be dropped
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 50, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 50, 0));

            // Any points at or after the low watermark of 100 will be processed as usual
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 100, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 100, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 105, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 105, 0));

            var expected = new PartitionedStreamEvent<int, int>[]
            {
                // Flushed in response to the default OnCompletedPolicy.EndOfStream
                PartitionedStreamEvent.CreatePoint(0, 1, 0),
                PartitionedStreamEvent.CreateLowWatermark<int, int>(100),
                // Points at time 50 are dropped
                PartitionedStreamEvent.CreatePoint(0, 100, 0),
                PartitionedStreamEvent.CreatePoint(1, 100, 0),
                PartitionedStreamEvent.CreatePoint(0, 105, 0),
                PartitionedStreamEvent.CreatePoint(1, 105, 0),

                // Generated by the default OnCompletedPolicy EndOfStream
                PartitionedStreamEvent.CreateLowWatermark<int, int>(StreamEvent.InfinitySyncTime),
            };

            FinishQuery(expected);
        }

        // Demonstrates how a low watermark with FlushOnLowWatermark policy induces a flush
        [TestMethod, TestCategory("Gated")]
        public void FlushOnLowWatermarkSimple()
        {
            SetupQuery(DisorderPolicy.Throw(reorderLatency: 10), PartitionedFlushPolicy.FlushOnLowWatermark,
                null, null, OnCompletedPolicy.None);

            // This will be buffered due to reorderLatency
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 1, 0));

            // Since we specify FlushOnLowWatermark, this will flush the point above, as well as the low watermark itself
            this.input.OnNext(PartitionedStreamEvent.CreateLowWatermark<int, int>(9000));

            var expected = new PartitionedStreamEvent<int, int>[]
            {
                // Flushed in response to low watermark
                PartitionedStreamEvent.CreatePoint(0, 1, 0),
                PartitionedStreamEvent.CreateLowWatermark<int, int>(9000),
            };

            FinishQuery(expected);
        }

        // Demonstrates how a low watermark with FlushOnLowWatermark policy induces a flush on other partitions
        [TestMethod, TestCategory("Gated")]
        public void FlushOnLowWatermarkDualStream()
        {
            SetupQuery(DisorderPolicy.Throw(reorderLatency: 500));

            // These will be buffered due to reorderLatency
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 1, 0));    // key 0: 1-2
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 101, 0));  // key 0: 101-102

            // This will batch the first point (0: 1-2)); but the second point ( key 0: 101-102) remains batched, since 101 > 100.
            // Since we use the default policy, FlushOnLowWatermark, the first point will also be flushed at this point.
            this.input.OnNext(PartitionedStreamEvent.CreateLowWatermark<int, int>(100));

            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 1005, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 1100, 0));

            // OnCompletedPolicy.EndOfStream adds the last two points to the batch then flushes all

            var expected = new PartitionedStreamEvent<int, int>[]
            {
                // Flushed in response to low watermark
                PartitionedStreamEvent.CreatePoint(0, 1, 0),
                PartitionedStreamEvent.CreateLowWatermark<int, int>(100),

                // Flushed in response to OnCompleted
                PartitionedStreamEvent.CreatePoint(0, 101, 0),
                PartitionedStreamEvent.CreatePoint(0, 1005, 0),
                PartitionedStreamEvent.CreatePoint(0, 1100, 0),

                // Generated by the default OnCompletedPolicy EndOfStream
                PartitionedStreamEvent.CreateLowWatermark<int, int>(StreamEvent.InfinitySyncTime),
            };

            FinishQuery(expected);
        }

        // Demonstrates how low watermarks are generated and the implications on partitions' lower bounds
        [TestMethod, TestCategory("Gated")]
        public void LowWatermarkGeneration()
        {
            SetupQuery(
                DisorderPolicy.Drop(),
                PartitionedFlushPolicy.None,
                PeriodicPunctuationPolicy.None(),
                PeriodicLowWatermarkPolicy.Time(generationPeriod: 100, lowWatermarkTimestampLag: 10));

            // This establishes three partitions with keys 0,1,2
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 0, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 0, 1));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(2, 0, 2));

            // This will not yet generate a low watermark, since 99 - 0 is less than the generation period of 100
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 99, 0));

            // This will generate a low watermark, since (110 - lowWatermarkTimestampLag of 10) is greater than the
            // last watermark timestamp (in this case, none) + generation period (100).
            // The timestamp will be 110 - (lowWatermarkTimestampLag of 10) = 100.
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(2, 110, 0));

            // These will be dropped due to our PartitionedDisorderPolicy.Drop, since the low watermark is now 100
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 90, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 99, 1));

            // These will be honored, since they are >= low watermark
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 100, 1));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 105, 2));

            // Though 100 >= low watermark, key 2's time is at 110, so this is out of order and will be dropped.
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(2, 100, 0));

            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 120, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 150, 1));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(2, 190, 2));

            // This will generate a low watermark, since (260 - lowWatermarkTimestampLag of 10) is greater than the
            // last watermark timestamp (100) + generation period (100).
            // The timestamp will be 260 - (lowWatermarkTimestampLag of 10) = 250.
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(2, 260, 0));

            // An explicitly ingressed low watermark will update/reset the generationPeriod
            this.input.OnNext(PartitionedStreamEvent.CreateLowWatermark<int, int>(300));

            // Even though (370 - lowWatermarkTimestampLag of 10) is greater than the last automatically generated
            // low watermark timestamp (100) + the generation period (100)); the lowWatermarkGeneration period was
            // updated to start at 300, so the next generated low watermark will be at timestamp >=
            // (300 + generationPeriod of 100)
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 370, 1));

            // This will generate a low watermark, since (410 - lowWatermarkTimestampLag of 10) is >= the last low
            // watermark timestamp (300) + the generation period (100).
            // The timestamp will be 410 - (lowWatermarkTimestampLag of 10) = 400.
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 410, 1));

            var expected = new PartitionedStreamEvent<int, int>[]
            {
                PartitionedStreamEvent.CreatePoint(0, 0, 0),
                PartitionedStreamEvent.CreatePoint(1, 0, 1),
                PartitionedStreamEvent.CreatePoint(2, 0, 2),

                PartitionedStreamEvent.CreatePoint(0, 99, 0),

                // Generated automatically because of PeriodicLowWatermarkPolicy in response to point (key 2:100)
                PartitionedStreamEvent.CreateLowWatermark<int, int>(100),
                PartitionedStreamEvent.CreatePoint(2, 110, 0),

                PartitionedStreamEvent.CreatePoint(0, 100, 1),
                PartitionedStreamEvent.CreatePoint(1, 105, 2),

                PartitionedStreamEvent.CreatePoint(0, 120, 0),
                PartitionedStreamEvent.CreatePoint(1, 150, 1),
                PartitionedStreamEvent.CreatePoint(2, 190, 2),

                // Generated automatically because of PeriodicLowWatermarkPolicy in response to point (key 2:260),
                // snapped to the leftmost generationPeriod boundary
                PartitionedStreamEvent.CreateLowWatermark<int, int>(200),
                PartitionedStreamEvent.CreatePoint(2, 260, 0),

                // Explicitly ingressed low watermark
                PartitionedStreamEvent.CreateLowWatermark<int, int>(300),

                PartitionedStreamEvent.CreatePoint(0, 370, 1),

                // Generated automatically because of PeriodicLowWatermarkPolicy in response to point (key 0:410)
                PartitionedStreamEvent.CreateLowWatermark<int, int>(400),
                PartitionedStreamEvent.CreatePoint(0, 410, 1),

                // Generated by the default OnCompletedPolicy EndOfStream
                PartitionedStreamEvent.CreateLowWatermark<int, int>(StreamEvent.InfinitySyncTime),
            };

            FinishQuery(expected);
        }

        // LowWatermark generation acts only on ingressed timestamps, completely before reorderLatency buffering,
        // so this test case is identical to LowWatermarkGeneration in terms of when the low watermarks are generated.
        // However, low watermarks set a lower bound on all partitions, including their reorder latency buffers,
        // so disorder policies can be affected.
        [TestMethod, TestCategory("Gated")]
        public void LowWatermarkGenerationWithReorderLatency()
        {
            SetupQuery(
                DisorderPolicy.Drop(reorderLatency: 50),
                PartitionedFlushPolicy.None,
                PeriodicPunctuationPolicy.None(),
                PeriodicLowWatermarkPolicy.Time(generationPeriod: 100, lowWatermarkTimestampLag: 10));

            // This establishes three partitions with keys 0,1,2
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 0, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 0, 1));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(2, 0, 2));

            // This will not yet generate a low watermark, since 99 - 0 is less than the generation period of 100
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 99, 0));

            // This will generate a low watermark, since (110 - lowWatermarkTimestampLag of 10) is greater than the
            // last watermark timestamp (in this case, none) + generation period (100).
            // The timestamp will be 110 - (lowWatermarkTimestampLag of 10) = 100.
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(2, 110, 0));

            // These will be dropped due to our PartitionedDisorderPolicy.Drop, since the low watermark is now 100,
            // even as they approach the low watermark, since any event before the low watermark is out of order.
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 90, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 99, 1));

            // These will be honored, since they are >= low watermark
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 100, 1));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 105, 2));

            // This will be reordered since it is >= low watermark and within the partition's
            // high watermark (110) - reorderLatency (50)
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(2, 100, 0));

            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 120, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 150, 1));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(2, 190, 2));

            // This will generate a low watermark, since (260 - lowWatermarkTimestampLag of 10) is greater than the
            // last watermark timestamp (100) + generation period (100).
            // The timestamp will be 260 - (lowWatermarkTimestampLag of 10) = 250.
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(2, 260, 0));

            // An explicitly ingressed low watermark will update/reset the generationPeriod
            this.input.OnNext(PartitionedStreamEvent.CreateLowWatermark<int, int>(300));

            // Even though (370 - lowWatermarkTimestampLag of 10) is greater than the last automatically generated
            // low watermark timestamp (100) + the generation period (100)); the lowWatermarkGeneration period was
            // updated to start at 300, so the next generated low watermark will be at timestamp >=
            // (300 + generationPeriod of 100)
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 370, 1));

            // This will generate a low watermark, since (410 - lowWatermarkTimestampLag of 10) is >= the last low
            // watermark timestamp (300) + the generation period (100).
            // The timestamp will be 410 - (lowWatermarkTimestampLag of 10) = 400.
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 410, 1));

            var expected = new PartitionedStreamEvent<int, int>[]
            {
                PartitionedStreamEvent.CreatePoint(0, 0, 0),
                PartitionedStreamEvent.CreatePoint(1, 0, 1),
                PartitionedStreamEvent.CreatePoint(2, 0, 2),

                PartitionedStreamEvent.CreatePoint(0, 99, 0),

                // Generated automatically because of PeriodicLowWatermarkPolicy in response to point (key 2:100)
                PartitionedStreamEvent.CreateLowWatermark<int, int>(100),

                PartitionedStreamEvent.CreatePoint(0, 100, 1),
                PartitionedStreamEvent.CreatePoint(2, 100, 0),
                PartitionedStreamEvent.CreatePoint(1, 105, 2),
                PartitionedStreamEvent.CreatePoint(2, 110, 0),

                PartitionedStreamEvent.CreatePoint(0, 120, 0),
                PartitionedStreamEvent.CreatePoint(1, 150, 1),
                PartitionedStreamEvent.CreatePoint(2, 190, 2),

                // Generated automatically because of PeriodicLowWatermarkPolicy in response to point (key 2:260),
                // snapped to the leftmost generationPeriod boundary
                PartitionedStreamEvent.CreateLowWatermark<int, int>(200),
                PartitionedStreamEvent.CreatePoint(2, 260, 0),

                // Explicitly ingressed low watermark
                PartitionedStreamEvent.CreateLowWatermark<int, int>(300),

                PartitionedStreamEvent.CreatePoint(0, 370, 1),

                // Generated automatically because of PeriodicLowWatermarkPolicy in response to point (key 0:410)
                PartitionedStreamEvent.CreateLowWatermark<int, int>(400),
                PartitionedStreamEvent.CreatePoint(0, 410, 1),

                // Generated by the default OnCompletedPolicy EndOfStream
                PartitionedStreamEvent.CreateLowWatermark<int, int>(StreamEvent.InfinitySyncTime),
            };

            FinishQuery(expected);
        }

        // This test demonstrates how low watermarks are generated alongside punctuations. Namely, a low watermark
        // implies a punctuation for every partition, and thus reset the punctuation generation period.
        [TestMethod, TestCategory("Gated")]
        public void LowWatermarkAndPunctuationGeneration()
        {
            SetupQuery(DisorderPolicy.Throw(), PartitionedFlushPolicy.None,
                PeriodicPunctuationPolicy.Time(generationPeriod: 20),
                PeriodicLowWatermarkPolicy.Time(generationPeriod: 50, lowWatermarkTimestampLag: 0));

            // This establishes three partitions with keys 0,1
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 0, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 0, 1));

            // These points do not generate punctuations or low watermarks since neither period has lapsed.
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 15, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 19, 1));

            // These points generate punctuations (period of 20) but not low watermarks (period of 50)
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 22, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 20, 1));

            // This point generates a low watermark, since it is >= the last watermark (none/0) + the low watermark
            // generation period of 50
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 50, 0));

            // These points do not generate punctuations, even though "punctuations" specifically haven't been generated
            // for these partitions since time 22/20, because the low watermark at 50 supersedes punctuations.
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 55, 1));

            // Punctuation generation continues once the punctuation generation period lapses from the low watermark
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 70, 2));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 71, 1));

            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 90, 2));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 91, 1));

            var expected = new PartitionedStreamEvent<int, int>[]
            {
                PartitionedStreamEvent.CreatePoint(0, 0, 0),
                PartitionedStreamEvent.CreatePoint(1, 0, 1),

                PartitionedStreamEvent.CreatePoint(0, 15, 0),
                PartitionedStreamEvent.CreatePoint(1, 19, 1),

                // Punctuations generated after the punctuation period of 20 from time 0, snapped to the leftmost generationPeriod boundary
                PartitionedStreamEvent.CreatePunctuation<int, int>(0, 20),
                PartitionedStreamEvent.CreatePoint(0, 22, 0),
                PartitionedStreamEvent.CreatePunctuation<int, int>(1, 20),
                PartitionedStreamEvent.CreatePoint(1, 20, 1),

                // Low watermark generated after the low watermark period of 50 from time 0
                PartitionedStreamEvent.CreateLowWatermark<int, int>(50),
                PartitionedStreamEvent.CreatePoint(0, 50, 0),

                PartitionedStreamEvent.CreatePoint(1, 55, 1),

                // Punctuations generated after the punctuation period of 20 from time 50 (low watermark time), snapped to the leftmost generationPeriod boundary
                PartitionedStreamEvent.CreatePunctuation<int, int>(0, 60),
                PartitionedStreamEvent.CreatePoint(0, 70, 2),
                PartitionedStreamEvent.CreatePunctuation<int, int>(1, 60),
                PartitionedStreamEvent.CreatePoint(1, 71, 1),

                // Punctuations generated after the punctuation period of 20 from times 60 (last punctuation time), snapped to the leftmost generationPeriod boundary
                PartitionedStreamEvent.CreatePunctuation<int, int>(0, 80),
                PartitionedStreamEvent.CreatePoint(0, 90, 2),
                PartitionedStreamEvent.CreatePunctuation<int, int>(1, 80),
                PartitionedStreamEvent.CreatePoint(1, 91, 1),

                // Generated by the default OnCompletedPolicy EndOfStream
                PartitionedStreamEvent.CreateLowWatermark<int, int>(StreamEvent.InfinitySyncTime),
            };

            FinishQuery(expected);
        }

        // This test demonstrates how punctuations are generated
        [TestMethod, TestCategory("Gated")]
        public void PunctuationTimeGeneration()
        {
            SetupQuery(
                DisorderPolicy.Throw(),
                PartitionedFlushPolicy.FlushOnLowWatermark,
                PeriodicPunctuationPolicy.Time(generationPeriod: 1000));

            // This establishes three partitions with keys 1,2,3
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 0, 0));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 0, 1));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(2, 0, 2));

            // Since the next point arrives at 10001, which is exactly PeriodicPunctuationPolicy.generationPeriod after
            // the first point, this will generate a punctuation at 10001
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(0, 10001, 0));

            // These are still valid, since the punctuation generated above only affects key 0
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(1, 1, 1));
            this.input.OnNext(PartitionedStreamEvent.CreatePoint(2, 1, 2));

            var expected = new PartitionedStreamEvent<int, int>[]
            {
                // Flushed in response to OnCompleted
                PartitionedStreamEvent.CreatePoint(0, 0, 0),
                PartitionedStreamEvent.CreatePoint(1, 0, 1),
                PartitionedStreamEvent.CreatePoint(2, 0, 2),

                // Punctuation snapped to the leftmost generationPeriod boundary
                PartitionedStreamEvent.CreatePunctuation<int, int>(0, 10000),
                PartitionedStreamEvent.CreatePoint(0, 10001, 0),

                PartitionedStreamEvent.CreatePoint(1, 1, 1),
                PartitionedStreamEvent.CreatePoint(2, 1, 2),

                // Generated by the default OnCompletedPolicy EndOfStream
                PartitionedStreamEvent.CreateLowWatermark<int, int>(StreamEvent.InfinitySyncTime),
            };

            FinishQuery(expected);
        }

        private static void ValidateExpectedOutput(
            IEnumerable<PartitionedStreamEvent<int, int>> expected,
            IEnumerable<PartitionedStreamEvent<int, int>> output,
            bool validateByPartition = false)
        {
            if (validateByPartition)
            {
                for (int key = 0; key < 3; key++)
                {
                    var partitionExpected = expected.Where(@event => @event.IsData && @event.PartitionKey == key).ToList();
                    var partitionOutput = output.Where(@event => @event.IsData && @event.PartitionKey == key).ToList();
                    Assert.IsTrue(partitionExpected.SequenceEqual(partitionOutput));
                }
                var nonDataEventsExpected = expected.Where(@event => !@event.IsData).ToList();
                var nonDataEventsOutput = output.Where(@event => !@event.IsData).ToList();
                Assert.IsTrue(nonDataEventsExpected.SequenceEqual(nonDataEventsOutput));
            }
            else
            {
                Assert.IsTrue(expected.SequenceEqual(output));
            }
        }

        private Subject<PartitionedStreamEvent<int, int>> input;
        private List<PartitionedStreamEvent<int, int>> output;
        private Task egress;
        private Process process;
        private bool validateByPartition;

        private void SetupQuery(
            DisorderPolicy disorderPolicy,
            PartitionedFlushPolicy flushPolicy = PartitionedFlushPolicy.FlushOnLowWatermark,
            PeriodicPunctuationPolicy punctuationPolicy = null, // Default: PeriodicPunctuationPolicy.None()
            PeriodicLowWatermarkPolicy lowWatermarkPolicy = null, // Default: PeriodicLowWatermarkPolicy.None()
            OnCompletedPolicy completedPolicy = OnCompletedPolicy.EndOfStream)
        {
            var qc = new QueryContainer();
            this.input = new Subject<PartitionedStreamEvent<int, int>>();
            var ingress = qc.RegisterInput(this.input, disorderPolicy, flushPolicy, punctuationPolicy,
                lowWatermarkPolicy, completedPolicy);
            this.output = new List<PartitionedStreamEvent<int, int>>();
            this.egress = qc.RegisterOutput(ingress).ForEachAsync(o => this.output.Add(o));
            this.process = qc.Restore();
            this.validateByPartition = disorderPolicy.reorderLatency > 0;
        }

        private void FinishQuery(PartitionedStreamEvent<int, int>[] expected)
        {
            this.input.OnCompleted();
            this.process.Flush();
            this.egress.Wait();

            ValidateExpectedOutput(expected, this.output, this.validateByPartition);
        }

        #region IDisposable Support
        public void Dispose()
        {
            if (this.input != null)
            {
                this.input.Dispose();
                this.input = null;
            }
        }
        #endregion
    }
}
